{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-data-transfer.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Azure Machine Learning Pipeline with DataTransferStep\n",
        "This notebook is used to demonstrate the use of DataTransferStep in an Azure Machine Learning Pipeline.\n",
        "\n",
        "In certain cases, you will need to transfer data from one data location to another. For example, your data may be in Azure SQL Database and you may want to move it to Azure Data Lake storage. Or, your data is in an ADLS account and you want to make it available in the Blob storage. The built-in **DataTransferStep** class helps you transfer data in these situations.\n",
        "\n",
        "The below examples show how to move data between different storage types supported in Azure Machine Learning.\n",
        "\n",
        "## Data transfer currently supports following storage types:\n",
        "\n",
        "| Data store | Supported as a source | Supported as a sink |\n",
        "| --- | --- | --- |\n",
        "| Azure Blob Storage | Yes | Yes |\n",
        "| Azure Data Lake Storage Gen 1 | Yes | Yes |\n",
        "| Azure Data Lake Storage Gen 2 | Yes | Yes |\n",
        "| Azure SQL Database | Yes | Yes |\n",
        "| Azure Database for PostgreSQL | Yes | Yes |",
        "| Azure Database for MySQL | Yes | Yes |"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Azure Machine Learning and Pipeline SDK-specific imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "import azureml.core\n",
        "from azureml.core.compute import ComputeTarget, DataFactoryCompute\n",
        "from azureml.exceptions import ComputeTargetException\n",
        "from azureml.core import Workspace, Experiment\n",
        "from azureml.pipeline.core import Pipeline\n",
        "from azureml.core.datastore import Datastore\n",
        "from azureml.data.data_reference import DataReference\n",
        "from azureml.pipeline.steps import DataTransferStep\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Initialize Workspace\n",
        "\n",
        "Initialize a workspace object from persisted configuration. If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure the config file is present at .\\config.json\n",
        "\n",
        "If you don't have a config.json file, please go through the [configuration Notebook](https://aka.ms/pl-config) first.\n",
        "\n",
        "This sets you up with a working config file that has information on your workspace, subscription id, etc. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "create workspace"
        ]
      },
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Register Datastores and create DataReferences\n",
        "\n",
        "For background on registering your data store, consult this article:\n",
        "\n",
        "https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-access-data\n",
        "\n",
        "> Please make sure to update the following code examples with appropriate values."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure Blob Storage\n",
        "\n",
        "> Since Blob Storage can contain a file and directory with the same name, you can use **source_reference_type** and **destination_reference_type** optional arguments in DataTransferStep constructor to explicitly specify whether you're referring to the file or the directory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "datastore-remarks-sample"
        ]
      },
      "outputs": [],
      "source": [
        "from msrest.exceptions import HttpOperationError\n",
        "\n",
        "blob_datastore_name='MyBlobDatastore'\n",
        "account_name=os.getenv(\"BLOB_ACCOUNTNAME_62\", \"<my-account-name>\") # Storage account name\n",
        "container_name=os.getenv(\"BLOB_CONTAINER_62\", \"<my-container-name>\") # Name of Azure blob container\n",
        "account_key=os.getenv(\"BLOB_ACCOUNT_KEY_62\", \"<my-account-key>\") # Storage account key\n",
        "\n",
        "try:\n",
        "    blob_datastore = Datastore.get(ws, blob_datastore_name)\n",
        "    print(\"Found Blob Datastore with name: %s\" % blob_datastore_name)\n",
        "except HttpOperationError:\n",
        "    blob_datastore = Datastore.register_azure_blob_container(\n",
        "        workspace=ws,\n",
        "        datastore_name=blob_datastore_name,\n",
        "        account_name=account_name, # Storage account name\n",
        "        container_name=container_name, # Name of Azure blob container\n",
        "        account_key=account_key) # Storage account key\n",
        "    print(\"Registered blob datastore with name: %s\" % blob_datastore_name)\n",
        "\n",
        "blob_data_ref = DataReference(\n",
        "    datastore=blob_datastore,\n",
        "    data_reference_name=\"blob_test_data\",\n",
        "    path_on_datastore=\"testdata\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure Data Lake Storage Gen1\n",
        "\n",
        "Please consult the following articles for detailed steps on setting up service principal authentication and assigning correct permissions to Data Lake Storage account:\n",
        "\n",
        "https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory\n",
        "https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-data-lake-store#use-service-principal-authentication"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "datastore_name='MyAdlsDatastore'\n",
        "subscription_id=os.getenv(\"ADL_SUBSCRIPTION_62\", \"<my-subscription-id>\") # subscription id of ADLS account\n",
        "resource_group=os.getenv(\"ADL_RESOURCE_GROUP_62\", \"<my-resource-group>\") # resource group of ADLS account\n",
        "store_name=os.getenv(\"ADL_STORENAME_62\", \"<my-datastore-name>\") # ADLS account name\n",
        "tenant_id=os.getenv(\"ADL_TENANT_62\", \"<my-tenant-id>\") # tenant id of service principal\n",
        "client_id=os.getenv(\"ADL_CLIENTID_62\", \"<my-client-id>\") # client id of service principal\n",
        "client_secret=os.getenv(\"ADL_CLIENT_SECRET_62\", \"<my-client-secret>\") # the secret of service principal\n",
        "\n",
        "try:\n",
        "    adls_datastore = Datastore.get(ws, datastore_name)\n",
        "    print(\"Found datastore with name: %s\" % datastore_name)\n",
        "except HttpOperationError:\n",
        "    adls_datastore = Datastore.register_azure_data_lake(\n",
        "        workspace=ws,\n",
        "        datastore_name=datastore_name,\n",
        "        subscription_id=subscription_id, # subscription id of ADLS account\n",
        "        resource_group=resource_group, # resource group of ADLS account\n",
        "        store_name=store_name, # ADLS account name\n",
        "        tenant_id=tenant_id, # tenant id of service principal\n",
        "        client_id=client_id, # client id of service principal\n",
        "        client_secret=client_secret) # the secret of service principal\n",
        "    print(\"Registered datastore with name: %s\" % datastore_name)\n",
        "\n",
        "adls_data_ref = DataReference(\n",
        "    datastore=adls_datastore,\n",
        "    data_reference_name=\"adls_test_data\",\n",
        "    path_on_datastore=\"testdata\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure Data Lake Storage Gen2\n",
        "\n",
        "Please consult the following article for detailed steps on setting up service principal authentication and assigning correct permissions to Data lake Storage Gen2 account:\n",
        "\n",
        "https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-data-lake-storage#service-principal-authentication"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "adlsgen2_datastore_name = 'myadlsgen2datastore'\n",
        "account_name=os.getenv(\"ADLSGEN2_ACCOUNTNAME_62\", \"<my-account-name>\") # ADLS Gen2 account name\n",
        "tenant_id=os.getenv(\"ADLSGEN2_TENANT_62\", \"<my-tenant-id>\") # tenant id of service principal\n",
        "client_id=os.getenv(\"ADLSGEN2_CLIENTID_62\", \"<my-client-id>\") # client id of service principal\n",
        "client_secret=os.getenv(\"ADLSGEN2_CLIENT_SECRET_62\", \"<my-client-secret>\") # the secret of service principal\n",
        "\n",
        "try:\n",
        "    adlsgen2_datastore = Datastore.get(ws, adlsgen2_datastore_name)\n",
        "    print(\"Found ADLS Gen2 datastore with name: %s\" % adlsgen2_datastore_name)\n",
        "except:\n",
        "    adlsgen2_datastore = Datastore.register_azure_data_lake_gen2(\n",
        "        workspace=ws,\n",
        "        datastore_name=adlsgen2_datastore_name,\n",
        "        filesystem='test', # Name of ADLS Gen2 filesystem\n",
        "        account_name=account_name, # ADLS Gen2 account name\n",
        "        tenant_id=tenant_id, # tenant id of service principal\n",
        "        client_id=client_id, # client id of service principal\n",
        "        client_secret=client_secret) # the secret of service principal\n",
        "    print(\"Registered datastore with name: %s\" % adlsgen2_datastore_name)\n",
        "\n",
        "adlsgen2_data_ref = DataReference(\n",
        "    datastore=adlsgen2_datastore,\n",
        "    data_reference_name='adlsgen2_test_data',\n",
        "    path_on_datastore='testdata')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure SQL Database\n",
        "\n",
        "For enabling service principal authentication for an Azure SQL Database, please follow this section in Azure Data Factory documentation: https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-sql-database#service-principal-authentication\n",
        "\n",
        "> Note: When copying data **to** an Azure SQL Database, data will be _appended_ to an existing table. We also expect the source file to have a header row and the names should exactly match with column names in destination table."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "sql_datastore_name=\"MySqlDatastore\"\n",
        "server_name=os.getenv(\"SQL_SERVERNAME_62\", \"<my-server-name>\") # Name of SQL server\n",
        "database_name=os.getenv(\"SQL_DATBASENAME_62\", \"<my-database-name>\") # Name of SQL database\n",
        "client_id=os.getenv(\"SQL_CLIENTNAME_62\", \"<my-client-id>\") # client id of service principal with permissions to access database\n",
        "client_secret=os.getenv(\"SQL_CLIENTSECRET_62\", \"<my-client-secret>\") # the secret of service principal\n",
        "tenant_id=os.getenv(\"SQL_TENANTID_62\", \"<my-tenant-id>\") # tenant id of service principal\n",
        "\n",
        "try:\n",
        "    sql_datastore = Datastore.get(ws, sql_datastore_name)\n",
        "    print(\"Found sql database datastore with name: %s\" % sql_datastore_name)\n",
        "except HttpOperationError:\n",
        "    sql_datastore = Datastore.register_azure_sql_database(\n",
        "        workspace=ws,\n",
        "        datastore_name=sql_datastore_name,\n",
        "        server_name=server_name,\n",
        "        database_name=database_name,\n",
        "        client_id=client_id,\n",
        "        client_secret=client_secret,\n",
        "        tenant_id=tenant_id)\n",
        "    print(\"Registered sql databse datastore with name: %s\" % sql_datastore_name)\n",
        "\n",
        "from azureml.data.sql_data_reference import SqlDataReference\n",
        "\n",
        "sql_query_data_ref = SqlDataReference(\n",
        "    datastore=sql_datastore,\n",
        "    data_reference_name=\"sql_query_data_ref\",\n",
        "    sql_query=\"select top 1 * from TestData\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure Database for PostgreSQL"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "psql_datastore_name=\"MyPostgreSqlDatastore\"\n",
        "server_name=os.getenv(\"PSQL_SERVERNAME_62\", \"<my-server-name>\") # Name of PostgreSQL server \n",
        "database_name=os.getenv(\"PSQL_DATBASENAME_62\", \"<my-database-name>\") # Name of PostgreSQL database\n",
        "user_id=os.getenv(\"PSQL_USERID_62\", \"<my-user-id>\") # user id\n",
        "user_password=os.getenv(\"PSQL_USERPW_62\", \"<my-user-password>\") # user password\n",
        "\n",
        "try:\n",
        "    psql_datastore = Datastore.get(ws, psql_datastore_name)\n",
        "    print(\"Found PostgreSQL database datastore with name: %s\" % psql_datastore_name)\n",
        "except HttpOperationError:\n",
        "    psql_datastore = Datastore.register_azure_postgre_sql(\n",
        "        workspace=ws,\n",
        "        datastore_name=psql_datastore_name,\n",
        "        server_name=server_name,\n",
        "        database_name=database_name,\n",
        "        user_id=user_id,\n",
        "        user_password=user_password)\n",
        "    print(\"Registered PostgreSQL databse datastore with name: %s\" % psql_datastore_name)\n",
        "\n",
        "from azureml.data.sql_data_reference import SqlDataReference\n",
        "\n",
        "psql_query_data_ref = SqlDataReference(\n",
        "    datastore=psql_datastore,\n",
        "    data_reference_name=\"psql_query_data_ref\",\n",
        "    sql_query=\"SELECT * FROM testtable\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Azure Database for MySQL"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "mysql_datastore_name=\"MySqlDatastore\"\n",
        "server_name=os.getenv(\"MYSQL_SERVERNAME_62\", \"<my-server-name>\") # Name of MySQL server \n",
        "database_name=os.getenv(\"MYSQL_DATBASENAME_62\", \"<my-database-name>\") # Name of MySQL database\n",
        "user_id=os.getenv(\"MYSQL_USERID_62\", \"<my-user-id>\") # user id\n",
        "user_password=os.getenv(\"MYSQL_USERPW_62\", \"<my-user-password>\") # user password\n",
        "\n",
        "try:\n",
        "    mysql_datastore = Datastore.get(ws, mysql_datastore_name)\n",
        "    print(\"Found MySQL database datastore with name: %s\" % mysql_datastore_name)\n",
        "except HttpOperationError:\n",
        "    mysql_datastore = Datastore.register_azure_my_sql(\n",
        "        workspace=ws,\n",
        "        datastore_name=mysql_datastore_name,\n",
        "        server_name=server_name,\n",
        "        database_name=database_name,\n",
        "        user_id=user_id,\n",
        "        user_password=user_password)\n",
        "    print(\"Registered MySQL databse datastore with name: %s\" % mysql_datastore_name)\n",
        "\n",
        "from azureml.data.sql_data_reference import SqlDataReference\n",
        "\n",
        "mysql_query_data_ref = SqlDataReference(\n",
        "    datastore=mysql_datastore,\n",
        "    data_reference_name=\"mysql_query_data_ref\",\n",
        "    sql_query=\"SELECT * FROM testtable\")\n",
        "\n",
        "mysql_table_data_ref = SqlDataReference(\n",
        "    datastore=mysql_datastore,\n",
        "    data_reference_name=\"mysql_table_data_ref\",\n",
        "    sql_table=\"testtable\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Setup Data Factory Account"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "data_factory_name = 'adftest'\n",
        "\n",
        "def get_or_create_data_factory(workspace, factory_name):\n",
        "    try:\n",
        "        return DataFactoryCompute(workspace, factory_name)\n",
        "    except ComputeTargetException as e:\n",
        "        if 'ComputeTargetNotFound' in e.message:\n",
        "            print('Data factory not found, creating...')\n",
        "            provisioning_config = DataFactoryCompute.provisioning_configuration()\n",
        "            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)\n",
        "            data_factory.wait_for_completion()\n",
        "            return data_factory\n",
        "        else:\n",
        "            raise e\n",
        "            \n",
        "data_factory_compute = get_or_create_data_factory(ws, data_factory_name)\n",
        "\n",
        "print(\"Setup Azure Data Factory account complete\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Create a DataTransferStep"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "**DataTransferStep** is used to transfer data between Azure Blob, Azure Data Lake Store, and Azure SQL database.\n",
        "\n",
        "- **name:** Name of module\n",
        "- **source_data_reference:** Input connection that serves as source of data transfer operation.\n",
        "- **destination_data_reference:** Input connection that serves as destination of data transfer operation.\n",
        "- **compute_target:** Azure Data Factory to use for transferring data.\n",
        "- **allow_reuse:** Whether the step should reuse results of previous DataTransferStep when run with same inputs. Set as False to force data to be transferred again.\n",
        "\n",
        "Optional arguments to explicitly specify whether a path corresponds to a file or a directory. These are useful when storage contains both file and directory with the same name or when creating a new destination path.\n",
        "\n",
        "- **source_reference_type:** An optional string specifying the type of source_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path.\n",
        "- **destination_reference_type:** An optional string specifying the type of destination_data_reference. Possible values include: 'file', 'directory'. When not specified, we use the type of existing path or directory if it's a new path."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "transfer_adls_to_blob = DataTransferStep(\n",
        "    name=\"transfer_adls_to_blob\",\n",
        "    source_data_reference=adls_data_ref,\n",
        "    destination_data_reference=blob_data_ref,\n",
        "    compute_target=data_factory_compute)\n",
        "\n",
        "print(\"Data transfer step created\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "\n",
        "transfer_adlsgen2_to_blob = DataTransferStep(\n",
        "    name='transfer_adlsgen2_to_blob',\n",
        "    source_data_reference=adlsgen2_data_ref,\n",
        "    destination_data_reference=blob_data_ref,\n",
        "    compute_target=data_factory_compute)\n",
        "\n",
        "transfer_sql_to_blob = DataTransferStep(\n",
        "    name=\"transfer_sql_to_blob\",\n",
        "    source_data_reference=sql_query_data_ref,\n",
        "    destination_data_reference=blob_data_ref,\n",
        "    compute_target=data_factory_compute,\n",
        "    destination_reference_type='file')\n",
        "\n",
        "transfer_psql_to_blob = DataTransferStep(\n",
        "    name=\"transfer_psql_to_blob\",\n",
        "    source_data_reference=psql_query_data_ref,\n",
        "    destination_data_reference=blob_data_ref,\n",
        "    compute_target=data_factory_compute,\n",
        "    destination_reference_type='file')\n",
        "\n",
        "transfer_mysql_to_blob = DataTransferStep(\n",
        "    name=\"transfer_mysql_to_blob\",\n",
        "    source_data_reference=mysql_query_data_ref,\n",
        "    destination_data_reference=blob_data_ref,\n",
        "    compute_target=data_factory_compute)\n",
        "print(\"Data transfer step created for Sql server, PostgreSQL and MySQL\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Build and Submit the Experiment"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_01 = Pipeline(\n",
        "    description=\"data_transfer_01\",\n",
        "    workspace=ws,\n",
        "    steps=[transfer_adls_to_blob])\n",
        "\n",
        "pipeline_run_01 = Experiment(ws, \"Data_Transfer_example_01\").submit(pipeline_01)\n",
        "pipeline_run_01.wait_for_completion()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_02 = Pipeline(\n",
        "    description=\"data_transfer_02\",\n",
        "    workspace=ws,\n",
        "    steps=[transfer_sql_to_blob,transfer_psql_to_blob, transfer_adlsgen2_to_blob])\n",
        "\n",
        "pipeline_run_02 = Experiment(ws, \"Data_Transfer_example_02\").submit(pipeline_02)\n",
        "pipeline_run_02.wait_for_completion()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### View Run Details"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run_01).show()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.widgets import RunDetails\n",
        "RunDetails(pipeline_run_02).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Next: Databricks as a Compute Target\n",
        "To use Databricks as a compute target from Azure Machine Learning Pipeline, a DatabricksStep is used. This [notebook](https://aka.ms/pl-databricks) demonstrates the use of a DatabricksStep in an Azure Machine Learning Pipeline."
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "ADF"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "Azure Machine Learning Pipeline with DataTranferStep",
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.7.3"
    },
    "order_index": 4,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates the use of DataTranferStep"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}